Netty(四):Netty模式 |
您所在的位置:网站首页 › ios 泛型 › Netty(四):Netty模式 |
文章目录案例异步模型Future 说明Future-Listener 机制HTTP服务 Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor BossGroup 线程维护 Selector,只关注 Accecpt当接收到 Accept 事件,获取到对应的 SocketChannel,封装成 NIOScoketChannel 并注册到 Worker 线程(事件循环),并进行维护当 Worker 线程监听到 Selector 中通道发生自己感兴趣的事件后,就进行处理(就由 handler),注意 handler 已经加入到通道 进阶版 详细版 1.Netty抽象出两组线程池,BossGroup 负责客户端连接,WorkerGroup 负责网络的读写 2.BossGroup 和 WorkerGroup 类型都是NIOEventLoopGroup 3.NIOEventLoop 相当于一个事件循环组,这个组包含多个事件循环,每一个循环都是NIOEventLoop 4.NIOEventLoop 表示一个不断循环的执行处理任务的线程.每个NIOEventLoop都有一个Selector,用于监听绑定在其上的socket网络通讯 5.NioEventLoopGroup 可以有多个线程,即可以包含多个NioEventLoop 6.每个BossNioEventLoop 循环执行步骤有3步 轮询accept事件 处理accept事件,与client建立连接,生成NioScoketChannel,并将其注册到某个worker,NIOEventLoop上的Selectot 处理任务队列的任务,即runAllTasks 7.每个worker NIOEventLoop循环执行的步骤 轮询read,write事件 处理IO事件,即read,write事件,在对应NIOSocketChannel处理 处理任务队列的任务,即runALlTasks 8.每个Worker,NIOEventLoop处理业务时,会使用pipeline,pipline中包含了channel,通过pipline可以获取到对应管道,管道中维护了很多的处理器 案例Netty 服务器在 6668 端口监听,客户端能发送消息给服务器"hello,服务器~" 服务器可以回复消息给客户端"hello,客户端~" NettyServer package com.zyd.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer { public static void main(String[] args) { /* 1.创建BossGroup 和 WorkerGroup 2. BossGroup只处理连接请求真正和客户端业务处理,交给WorkerGroup完成 3.两个都是无限循环 4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 实际 cpu核数 *2 */ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 try { //创建服务器端启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程进行设置 bootstrap.group(bossGroup, workerGroup)//设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioSocketChannel作为服务器通道实现 .option(ChannelOption.SO_BACKLOG, 128) //设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 .childHandler(new ChannelInitializer() { //创建一个通道初始化对象 protected void initChannel(SocketChannel socketChannel) throws Exception { //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue System.out.println("客户socketChannel hashCode = " + socketChannel.hashCode()); socketChannel.pipeline().addLast(new NettyServerHandler()); } });// 给我们的workerGroup 的 EventLoop 对应的管道设置处理器 System.out.println("服务器 is ready "); //绑定一个端口,并且同步,生成一个ChannelFuture //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(6668).sync(); //给cf注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("监听端口 6668 成功"); } else { System.out.println("监听端口 6668 失败"); } } }); cf.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}NettyServerHandler package com.zyd.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;/** * 自定义一个Handler,需要继续netty,规定好的某个HandlerAdapter * */public class NettyServerHandler extends ChannelInboundHandlerAdapter { // 读取数据实际 /* 1.ChannelHandlerContext ctx:上下文对象,含有 管道pipline,通道channel,地址 2. Object msg: 就是客户端发送的数据 默认Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); System.out.println("server ctx =" + ctx); System.out.println("看看channel 和 pipeline的关系"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站 //将 msg 转成一个 ByteBuf //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer. ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + channel.remoteAddress()); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //将数据写入到缓存,并刷新 //一般讲,我们对这个发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^^ω^^ω^^ω^^ω^^ω^ |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |